scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+
Aggregation — Typed and Untyped Grouping
You can group records in a Dataset using a condition to compute aggregates (over a collection of grouped records).
You can use agg method for computing aggregations per column on the entire data set (without first creating groups and considering the entire data set as one group).
The following aggregate operators are available:
-
groupBy for untyped aggregations with Column- or String-based column names.
-
groupByKey for strongly-typed aggregations where the data is grouped by a given key function.
-
rollup
-
cube
The untyped aggregations, e.g. groupBy
, rollup
, and cube
, return RelationalGroupedDatasets while groupByKey
returns a KeyValueGroupedDataset.
Aggregates on Entire Dataset (Without Groups) — agg
Operator
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg
computes aggregate expressions on all the records in a Dataset
.
Note
|
agg is simply a shortcut for groupBy().agg(…).
|
Grouping by Columns — groupBy
Untyped Operators
groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy
methods group the Dataset
using the specified columns as Columns or their text representation. It returns a RelationalGroupedDataset to apply aggregation to.
// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt
scala> val dataset = ints.toDF("n").withColumn("m", 'n % 2)
dataset: org.apache.spark.sql.DataFrame = [n: int, m: int]
scala> dataset.count
res0: Long = 1000
scala> dataset.groupBy('m).agg(sum('n)).show
+---+------+
| m|sum(n)|
+---+------+
| 1|250000|
| 0|250500|
+---+------+
Internally, it first resolves columns and then builds a RelationalGroupedDataset.
Note
|
The following session uses the data setup as described in Test Setup section below. |
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa| 150.0| 0.205|
| bbb| 250.0| 0.475|
+----+--------------+----------+
scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
| aaa| 100| 0.12|
+----+---------+----------+
scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa| 2|
| bbb| 2|
+----+-----+
scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa| 0.29|
| bbb| 0.53|
+----+----------+
scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa| 0.41|
| bbb| 0.95|
+----+----------+
scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId| sum(score)|
+---------+------------------+
| 300| 0.42|
| 100| 0.12|
| 200|0.8200000000000001|
+---------+------------------+
groupByKey
Typed Operator
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
groupByKey
groups records (of type T
) by the input func
. It returns a KeyValueGroupedDataset to apply aggregation to.
Note
|
groupByKey is Dataset 's experimental API.
|
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
| 300| 1|
| 100| 1|
| 200| 2|
+-----+--------+
import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId| sum|
+---------+------------------+
| 100| 0.12|
| 200|0.8200000000000001|
| 300| 0.42|
+---------+------------------+
RelationalGroupedDataset
RelationalGroupedDataset
is also a result of executing pivot operator on a grouped records as RelationalGroupedDataset
.
It offers the following operators to work on a grouped collection of records:
-
agg
-
count
-
mean
-
max
-
avg
-
min
-
sum
-
pivot
KeyValueGroupedDataset
KeyValueGroupedDataset
is an experimental interface to a result of executing the strongly-typed operator groupByKey.
scala> val tokensByName = dataset.groupByKey(_.name)
tokensByName: org.apache.spark.sql.KeyValueGroupedDataset[String,Token] = org.apache.spark.sql.KeyValueGroupedDataset@1e3aad46
It holds keys
that were used for the object.
scala> tokensByName.keys.show
+-----+
|value|
+-----+
| aaa|
| bbb|
+-----+
The following methods are available for any KeyValueGroupedDataset
to work on groups of records:
-
agg
(of 1 to 4 types) -
mapGroups
-
flatMapGroups
-
reduceGroups
-
count
that is a special case ofagg
with count function applied. -
cogroup
Test Setup
This is a setup for learning GroupedData
. Paste it into Spark Shell using :paste
.
import spark.implicits._
case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
Token("aaa", 200, 0.29) ::
Token("bbb", 200, 0.53) ::
Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache (1)
-
Cache the dataset so the following queries won’t load/recompute data over and over again.